{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Building a simple news recommendation system\n",
    "\n",
    "The goal of this example is to train a very simple news recommendation system. We will\n",
    "- prepare the training data in parallel with Ray\n",
    "- train a simple model that classifies article titles as \"popular\" or \"less popular\" using scikit learn and\n",
    "- find good hyperparameter settings for the model with Tune, Ray's parallel hyperparameter optimization library.\n",
    "\n",
    "### Downloading and preparing the training data\n",
    "\n",
    "<html><img src=\"newsreader_1.png\"/></html>\n",
    "\n",
    "First we will download and uncompress 400,000 hackernews submissions. This is a small subset of the articles that have been submitted to https://news.ycombinator.com. The data includes the title of each submission and its score, which roughly corresponds to the number of upvotes. There are 4 batches of JSON files that contain the information, named `submission-1.json` through `submission-4.json`. The first couple lines of the first file will be printed below by the `head` command."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!wget -nc https://s3-us-west-2.amazonaws.com/ray-tutorials/hackernews.zip\n",
    "!unzip -o hackernews.zip\n",
    "!head -n 2 submission-1.json"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import json\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The function below parses a chunk of the data and produces a pandas DataFrame with the titles and scores of the submissions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def parse_hn_submissions(path):\n",
    "    with open(path, \"r\") as f:\n",
    "        records = []\n",
    "        for line in f.readlines():\n",
    "            body = json.loads(line)[\"body\"]\n",
    "            records.append({\"data\": body[\"title\"], \"score\": body[\"score\"]})\n",
    "        return pd.DataFrame(records)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We now process all the data chunks and concatenate them into a single dataframe:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_time = time.time()\n",
    "\n",
    "files = [\"submission-\" + str(i) + \".json\" for i in range(1, 5)]\n",
    "records = [parse_hn_submissions(file) for file in files]\n",
    "df = pd.concat(records)\n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time\n",
    "print(\"Took {} seconds to parse the hackernews submissions\".format(duration))\n",
    "\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Modify the code above to parallelize the parsing of the four files with Ray!\n",
    "\n",
    "**Note**: In Binder this will not lead to a speedup (in fact, it will be slower) due to constrained resources (each Binder instance is shared with many other people). On an uncontended EC2 instance, we get **4.25s** for the serial code and **1.34s** for the parallel version."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We use the following lines to determine a cutoff of what we consider a \"good\" article. The median score for articles is 1, so we want to label articles with score higher than that as class \"1\" and everything else as \"0\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df[\"score\"].median()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df[\"target\"] = df[\"score\"] > 1.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are now done preparing the data and can start training a model."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training a model\n",
    "\n",
    "<html><img src=\"newsreader_2.png\"/></html>\n",
    "\n",
    "First we split the data into a train and test set."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.model_selection import train_test_split\n",
    "train, test = train_test_split(df, test_size=0.2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following defines a pipeline that first converts the title of the submission to a bag of words and then applies an SVM for the actual classification. Note that we are fitting a very simple SVM here due to the computational restrictions of Binder. With more resources, a state-of-the-art model like [BERT](https://ai.googleblog.com/2018/11/open-sourcing-bert-state-of-art-pre.html) would be a better choice, in this case the code would be structured similarly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.pipeline import Pipeline\n",
    "from sklearn.feature_extraction.text import CountVectorizer\n",
    "from sklearn.linear_model import SGDClassifier\n",
    "\n",
    "pipeline = Pipeline([\n",
    "    (\"vect\", CountVectorizer()),\n",
    "    (\"clf\", SGDClassifier(loss=\"hinge\", penalty=\"l2\",\n",
    "                          alpha=0.001,\n",
    "                          max_iter=5, tol=1e-3,\n",
    "                          warm_start=True))])\n",
    "result = pipeline.fit(train.data, train.target)\n",
    "\n",
    "predicted = result.predict(train.data)\n",
    "print(\"Accuracy on the training set is {}\".format(np.mean(predicted == train.target)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "predicted = pipeline.predict(test.data)\n",
    "print(\"Accuracy on the test set is {}\".format(np.mean(predicted == test.target)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also classify new titles as follows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.predict([\"Iconic consoles of the IBM System/360 mainframes, 55 years old today\",\n",
    "                  \"Are Banned Drugs in Your Meat?\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Hyperparameter tuning\n",
    "<html><img src=\"newsreader_3.png\"/></html>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's try to improve these results by doing some hyperparameter tuning. Hyperparameter tuning is the process of finding the best parameters for the learning algorithm. These parameters are typically few numbers like learning rate schedule (i.e. how large steps to take in each iteration), regularization parameters or size of the model. By tuning these knobs, we can typically make the model perform better. Tune supports a number of different algorithms to perform hyperparameter tuning. The simplest is a grid search where we just exhaustively try out different values for the parameters. More sophisticated algorithms include hyperband and population based training. If you want to learn more about these, check out the [tune documentation](https://ray.readthedocs.io/en/latest/tune.html). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import pickle\n",
    "from ray import tune"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First we need to put the training data into the object store (to make sure it will be re-used between training runs), and define the objective function. The objective function `train_func` takes two arguments: The `config` argument which contains the hyperparameters for that hyperparameter run. The `reporter` object can be used to report the performance of these hyperparameters back to tune so it can select the next trial based on the performance of the past ones.\n",
    "\n",
    "**EXERCISE**: Inside the `train_func`, instantiate the training pipeline as above and replace the concrete value of $\\alpha$ with the value `config[\"alpha\"]` that is passed in by Tune.\n",
    "\n",
    "The following function instantiates a model corresponding to the hyperparameters in `config`, runs 5 iterations of training and saves the model parameters to a checkpoint file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_id = ray.put(train)\n",
    "test_id = ray.put(test)\n",
    "\n",
    "def train_func(config, reporter):\n",
    "    pipeline = # TODO: Put in the training pipeline here\n",
    "    train = ray.get(train_id)\n",
    "    test = ray.get(test_id)\n",
    "    for i in range(5):\n",
    "        # Perform one epoch of SGD\n",
    "        X = pipeline.named_steps[\"vect\"].fit_transform(train.data)\n",
    "        pipeline.named_steps[\"clf\"].partial_fit(X, train.target, classes=[0, 1])\n",
    "        with open(\"model.pkl\", \"wb\") as f:\n",
    "            pickle.dump(pipeline, f)\n",
    "        reporter(mean_accuracy=np.mean(pipeline.predict(test.data) == test.target))  # report metrics"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can then get the best setting for the regularization parameter $\\alpha$ as follows. **You should expect the training to take about 4-5 minutes**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "all_trials = tune.run(\n",
    "    train_func,\n",
    "    name=\"news_recommendation\",\n",
    "    # With the \"stop\" parameter, you could also specify a stopping criterion.\n",
    "    config={\"alpha\": tune.grid_search([1e-3, 1e-4, 1e-5, 1e-6])}\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The best model can now be loaded and evaluated like so:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "best_result_path = os.path.join(all_trials.get_best_logdir(\"mean_accuracy\"), \"model.pkl\")\n",
    "with open(best_result_path, \"rb\") as f:\n",
    "    pipeline = pickle.load(f)\n",
    "print(\"Best result was {}\".format(np.mean(pipeline.predict(test.data) == test.target)))\n",
    "print(\"Best result path is {}\".format(best_result_path))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
